package com.example.zookeeper.config.kafka;

import com.alibaba.fastjson.JSON;
import com.example.zookeeper.common.constant.Contants;
import com.example.zookeeper.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;

/**
 * kafka 监听器，poll消息
 */
@Slf4j
@Component
public class KafkaListeners {

    @KafkaListener(containerFactory = "kafkaBatchListener6", groupId = "${kafka.consumer.group-id}", topics = {"#{'${kafka.listener.topics}'.split(',')[0]}"})
    public void batchListener(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
        System.out.println("-------------------------------------------");

        List<User> userList = new ArrayList<>();
        try {
            records.forEach(record -> {
//                User user = JSON.parseObject(record.value().toString(), User.class);
//                user.getCreateTime().format(DateTimeFormatter.ofPattern(Contants.DateTimeFormat.DATE_TIME_PATTERN));
//                userList.add(user);
                log.info("接收到消息：\n" + record);
                try {
                    ack.acknowledge();
                    Thread.sleep(20000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        } catch (Exception e) {
            log.error("Kafka监听异常" + e.getMessage(), e);
        } finally {
            //手动提交偏移量
            // ack.acknowledge();
        }

    }

}
